181 lines
6.0 KiB
Nim
181 lines
6.0 KiB
Nim
# Task Runner
|
|
# adapted from
|
|
# Chronos
|
|
# (github.com/status-im/nim-chronos/pull/45)
|
|
#
|
|
# (c) Copyright 2015 Dominik Picheta
|
|
# (c) Copyright 2018-Present Status Research & Development GmbH
|
|
#
|
|
# Licensed under either of
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
# MIT license (LICENSE-MIT)
|
|
import os
|
|
|
|
import chronos/[asyncloop, selectors2]
|
|
export asyncloop
|
|
|
|
when defined(windows):
|
|
import winlean, sets, hashes
|
|
|
|
when defined(windows) or defined(nimdoc):
|
|
type
|
|
RwfsoOverlapped* = object of CustomOverlapped
|
|
ioPort*: Handle
|
|
handle*: Handle
|
|
waitFd*: Handle
|
|
timerOrWait*: WINBOOL
|
|
|
|
RefRwfsoOverlapped* = ref RwfsoOverlapped
|
|
|
|
when defined(windows):
|
|
|
|
{.push stackTrace:off.}
|
|
proc waitCallback(param: pointer,
|
|
timerOrWaitFired: WINBOOL): void {.stdcall.} =
|
|
var p = cast[RefRwfsoOverlapped](param)
|
|
p.timerOrWait = timerOrWaitFired
|
|
discard postQueuedCompletionStatus(p.ioPort, DWORD(timerOrWaitFired),
|
|
ULONG_PTR(p.handle),
|
|
cast[pointer](p))
|
|
{.pop.}
|
|
|
|
proc awaitForSingleObject*(handle: Handle, timeout: Duration): Future[bool] =
|
|
## Wait for Windows' waitable handle (handle which can be waited via
|
|
## WaitForSingleObject API call) in asynchronous way.
|
|
## Procedure returns ``true`` if state of handle ``handle`` become
|
|
## signalled, and ``false`` if timeout ``timeout`` was expired before
|
|
## handle ``handle`` become signaled.
|
|
##
|
|
## ``handle`` can be one of the listed types: Change notification,
|
|
## Console input, Event, Memory resource notification, Mutex, Process,
|
|
## Semaphore, Thread, Waitable timer.
|
|
##
|
|
## If timeout ``timeout`` is ``ZeroDuration`` procedure will check if
|
|
## handle is signalled and return immediately.
|
|
var retFuture = newFuture[bool]("chronos.awaitForSingleObject")
|
|
var loop = getThreadDispatcher()
|
|
|
|
var povl: RefRwfsoOverlapped
|
|
var flags = DWORD(WT_EXECUTEONLYONCE)
|
|
var timems: ULONG
|
|
|
|
if timeout == ZeroDuration:
|
|
let res = waitForSingleObject(handle, 0)
|
|
if res == WAIT_TIMEOUT:
|
|
retFuture.complete(false)
|
|
return retFuture
|
|
elif res == WAIT_OBJECT_0:
|
|
retFuture.complete(true)
|
|
return retFuture
|
|
else:
|
|
retFuture.fail(newException(AsyncError,
|
|
"Mutex object was not released"))
|
|
return retFuture
|
|
else:
|
|
if timeout == InfiniteDuration:
|
|
timems = INFINITE
|
|
else:
|
|
timems = ULONG(timeout.milliseconds)
|
|
|
|
povl = RefRwfsoOverlapped()
|
|
GC_ref(povl)
|
|
|
|
proc handleContinuation(udata: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
loop.handles.excl(AsyncFD(handle))
|
|
if unregisterWait(povl.waitFd) == 0:
|
|
let err = osLastError()
|
|
if int(err) != ERROR_IO_PENDING:
|
|
GC_unref(povl)
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
return
|
|
|
|
if povl.timerOrWait != 0:
|
|
GC_unref(povl)
|
|
retFuture.complete(false)
|
|
else:
|
|
GC_unref(povl)
|
|
retFuture.complete(true)
|
|
|
|
proc cancel(udata: pointer) {.gcsafe.} =
|
|
if not(retFuture.finished()):
|
|
loop.handles.excl(AsyncFD(handle))
|
|
discard unregisterWait(povl.waitFd)
|
|
GC_unref(povl)
|
|
|
|
povl.data = CompletionData(fd: AsyncFD(handle), cb: handleContinuation)
|
|
povl.ioPort = loop.getIoHandler()
|
|
povl.handle = handle
|
|
loop.handles.incl(AsyncFD(handle))
|
|
if not registerWaitForSingleObject(addr povl.waitFd, povl.handle,
|
|
cast[WAITORTIMERCALLBACK](waitCallback),
|
|
cast[pointer](povl), timems, flags):
|
|
let err = osLastError()
|
|
GC_unref(povl)
|
|
loop.handles.excl(AsyncFD(handle))
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
|
|
retFuture.cancelCallback = cancel
|
|
return retFuture
|
|
|
|
else:
|
|
|
|
proc getFd*(event: SelectEvent): cint =
|
|
type
|
|
EventType = object
|
|
fd: cint
|
|
PEventType = ptr EventType
|
|
var e = cast[PEventType](event)
|
|
result = e.fd
|
|
|
|
proc awaitForSelectEvent*(event: SelectEvent,
|
|
timeout: Duration): Future[bool] =
|
|
## Wait for Selectors' event SelectEvent in asynchronous way.
|
|
##
|
|
## Procedure returns ``true`` if state of event ``event`` become
|
|
## signalled, and ``false`` if timeout ``timeout`` occurs before
|
|
## event ``event`` become signaled.
|
|
var retFuture = newFuture[bool]("chronos.awaitForSelectEvent")
|
|
let loop = getThreadDispatcher()
|
|
var data: SelectorData
|
|
var moment: Moment
|
|
|
|
proc handleContinuation(udata: pointer) {.gcsafe.} =
|
|
try:
|
|
if not(retFuture.finished()):
|
|
loop.selector.unregister(event)
|
|
if isNil(udata):
|
|
retFuture.complete(false)
|
|
else:
|
|
retFuture.complete(true)
|
|
except IOSelectorsException as e:
|
|
raise newException(Defect, e.msg)
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
|
try:
|
|
if not(retFuture.finished()):
|
|
loop.selector.unregister(event)
|
|
if timeout != InfiniteDuration:
|
|
removeTimer(moment, handleContinuation, nil)
|
|
except IOSelectorsException as e:
|
|
raise newException(Defect, e.msg)
|
|
|
|
if timeout != InfiniteDuration:
|
|
moment = Moment.fromNow(timeout)
|
|
discard setTimer(moment, handleContinuation, nil)
|
|
|
|
let fd = event.getFd()
|
|
loop.selector.registerEvent(event, data)
|
|
|
|
withData(loop.selector, int(fd), adata) do:
|
|
adata.reader = AsyncCallback(function: handleContinuation,
|
|
udata: addr adata.rdata)
|
|
adata.rdata.fd = AsyncFD(fd)
|
|
adata.rdata.udata = nil
|
|
do:
|
|
retFuture.fail(newException(ValueError,
|
|
"Event descriptor not registered."))
|
|
|
|
retFuture.cancelCallback = cancellation
|
|
return retFuture
|